// We are using promtail client version v2.8.2:
// promtail must be added using commit hashes instead of version tags, see https://github.com/grafana/loki/issues/2826
// go get github.com/grafana/loki/clients/pkg/promtail/client@9f809eda70babaf583bdf6bf335a28038f286618

package telemetry

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/blendle/zapdriver"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"google.golang.org/api/option"

	gkzap "github.com/go-kit/kit/log/zap"
	"github.com/grafana/dskit/backoff"
	"github.com/grafana/dskit/flagext"
	"github.com/grafana/loki/clients/pkg/promtail/api"
	"github.com/grafana/loki/clients/pkg/promtail/client"
	"github.com/grafana/loki/pkg/logproto"
	lokiflag "github.com/grafana/loki/pkg/util/flagext"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promauto"
	"github.com/prometheus/common/config"
	"github.com/prometheus/common/model"
)

const (
	// bufferSize specifies how many log messages can be queued up locally before we start dropping them.
	bufferSize = 1000

	// clientTimeout is how long we are willing to wait for Loki on shutdown. Note that this is an UPPER LIMIT. In the sunny day scenario, we won't need to wait.
	clientTimeout = 250 * time.Millisecond
)

// ExternalLoggerLoki implements ExternalLogger for the Grafana Loki cloud logging.
type ExternalLoggerLoki struct {
	// c is the promtail client.
	c client.Client

	// labels is the set of labels to be added to each log entry, based on the severity (since severity is one of the labels).
	labels map[zapcore.Level]model.LabelSet

	// localLogger is the zap localLogger used to log errors generated by the loki adapter. It does not use telemetry.
	localLogger *zap.Logger

	// bufferedChan is used to buffer log messages so that the app does not block on Loki. The Loki internal channel is unbuffered, so we
	// write things to this channel. If this write would block, we peg a metric and drop the log message (although it still gets logged locally).
	// There is then a worker routine picking messages off of this local channel and writing them to the Loki channel in a blocking manner.
	bufferedChan chan api.Entry

	// cancelWorker is used to cancel the worker routine on shutdown.
	cancelWorker context.CancelFunc

	// workerExitedC is used by the worker to signal that it has exited.
	workerExitedC chan struct{}
}

var (
	lokiMessagesSent = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "wormhole_loki_messages_sent",
			Help: "Total number of log messages posted to Loki",
		})

	lokiMessagesDropped = promauto.NewCounter(
		prometheus.CounterOpts{
			Name: "wormhole_loki_messages_dropped",
			Help: "Total number of log messages dropped while posting to Loki",
		})
)

func (logger *ExternalLoggerLoki) log(ts time.Time, message json.RawMessage, level zapcore.Level) {
	lokiLabels := logger.labels[level]

	bytes, err := message.MarshalJSON()
	if err != nil {
		logger.localLogger.Error("Failed to marshal log message", zap.Error(err))
		return
	}
	entry := api.Entry{
		Entry: logproto.Entry{
			Timestamp: ts,
			Line:      string(bytes),
		},

		Labels: lokiLabels,
	}

	select {
	case logger.bufferedChan <- entry:
		lokiMessagesSent.Inc()
	default:
		lokiMessagesDropped.Inc()
	}

	// A fatal error exits, which can cause us to lose messages. Shut down the worker so it will flush the logs.
	if level == zapcore.FatalLevel {
		logger.stopWorkerWithTimeout()
	}
}

func (logger *ExternalLoggerLoki) close() {
	// Shut down the worker and wait for it to exit. It has a timeout so we won't wait forever.
	logger.stopWorkerWithTimeout()
}

// NewLokiCloudLogger creates a new Telemetry logger using Grafana Loki Cloud Logging.
// skipPrivateLogs: if set to `true`, logs with the field zap.Bool("_privateLogEntry", true) will not be logged by telemetry.
func NewLokiCloudLogger(ctx context.Context, logger *zap.Logger, url string, productName string, skipPrivateLogs bool, labels map[string]string, _ ...option.ClientOption) (*Telemetry, error) {
	// The localLogger is used to log errors generated by the loki adapter. It does not use telemetry.
	localLogger := logger.With(zap.String("component", "loki"))

	// The gkLogger is passed into the loki client, which expects a go-kit logger.
	gkLogger := gkzap.NewZapSugarLogger(localLogger, zapcore.ErrorLevel)

	// Loki pegs these metrics: https://github.com/grafana/loki/blob/main/clients/pkg/promtail/client/client.go#L71-L127
	m := client.NewMetrics(prometheus.DefaultRegisterer)

	serverURL := flagext.URLValue{}
	err := serverURL.Set(url)
	if err != nil {
		return nil, fmt.Errorf("failed to parse Loki client url: %v", err)
	}

	cfg := client.Config{
		URL:                    serverURL,
		DropRateLimitedBatches: true,
		Client:                 config.HTTPClientConfig{},
		// TenantID: We are not using the tenantID.

		// Using default values from by promtail:
		// https://github.com/grafana/loki/blob/bad691b5091f1ad2f09dbfb30d5395b8f57a3bcd/docs/sources/clients/promtail/configuration.md
		BatchWait:      1 * time.Minute,
		BatchSize:      1048576,
		BackoffConfig:  backoff.Config{MinBackoff: 500 * time.Millisecond, MaxBackoff: 5 * time.Minute, MaxRetries: 10},
		ExternalLabels: lokiflag.LabelSet{},
		Timeout:        10 * time.Second,
	}

	clientMaxLineSize := 1024
	clientMaxLineSizeTruncate := true

	c, err := client.New(m, cfg, 0, clientMaxLineSize, clientMaxLineSizeTruncate, gkLogger)
	if err != nil {
		return nil, fmt.Errorf("failed to create Loki client: %v", err)
	}

	// Since severity is one of the labels, create a label set for each severity to avoid copying the labels map for each log entry.
	lokiLabels := make(map[zapcore.Level]model.LabelSet)
	for level := zapcore.DebugLevel; level <= zapcore.FatalLevel; level++ {
		levLabels := model.LabelSet{}
		for k, v := range labels {
			levLabels[model.LabelName(k)] = model.LabelValue(v)
		}
		levLabels[model.LabelName("product")] = model.LabelValue(productName)
		levLabels[model.LabelName("severity")] = model.LabelValue(level.CapitalString())
		lokiLabels[level] = levLabels
	}

	// Create a buffered channel so the application does not block in the logger.
	bufferedChan := make(chan api.Entry, bufferSize)

	// Create a local context with a cancel function so we can signal our worker to shutdown when the time comes.
	// Cancelling the worker also closes the Loki client.
	workerContext, cancelWorker := context.WithCancel(ctx)

	// Create a channel used by the worker to signal that it has exited.
	workerExitedC := make(chan struct{}, 1)

	// Kick off the worker to read from the local buffered channel and write to the Loki unbuffered channel.
	go logWriter(workerContext, localLogger, bufferedChan, workerExitedC, c)

	return &Telemetry{
		encoder: &guardianTelemetryEncoder{
			Encoder: zapcore.NewJSONEncoder(zapdriver.NewProductionEncoderConfig()),
			logger: &ExternalLoggerLoki{
				c:             c,
				labels:        lokiLabels,
				localLogger:   localLogger,
				bufferedChan:  bufferedChan,
				cancelWorker:  cancelWorker,
				workerExitedC: workerExitedC,
			},
			skipPrivateLogs: skipPrivateLogs,
		},
	}, nil
}

// logWriter is the go routine that takes log messages off the buffered channel and posts them to the Loki client. It can block
// on the Loki client until our context is canceled, meaning we are shutting down. On shutdown, it tries to flush buffered messages
// and shutdown the Loki client using a timeout for both actions.
func logWriter(ctx context.Context, logger *zap.Logger, localC chan api.Entry, workerExitedC chan struct{}, c client.Client) {
	// pendingEntry is used to save the last log message if the write to Loki is interrupted by the context being canceled. We will attempt to flush it.
	var pendingEntry *api.Entry

	for {
		select {
		case entry, ok := <-localC:
			if !ok {
				logger.Error("Loki log writer is exiting because the buffered channel has been closed")
				cleanUpWorker(logger, workerExitedC, c)
				return
			}

			// Write to Loki in a blocking manner unless we are signaled to shutdown.
			select {
			case c.Chan() <- entry: //nolint:channelcheck // We want to block on the Loki client.
				pendingEntry = nil
			case <-ctx.Done():
				// Time to shutdown. We probably failed to write this message, save it so we can try to flush it.
				pendingEntry = &entry
			}
		case <-ctx.Done():
			logger.Info("Loki log writer shutting down")

			// Flush as much as we can in our allowed time.
			if numRemaining, err := flushLogsWithTimeout(localC, c, pendingEntry); err != nil {
				logger.Error("worker failed to flush logs", zap.Error(err), zap.Int("numEventsRemaining", numRemaining))
			}

			cleanUpWorker(logger, workerExitedC, c)
			return
		}
	}
}

// flushLogsWithTimeout is used to flush any buffered log messages on shutdown.
// It uses a timeout so that we only delay guardian shutdown for so long.
func flushLogsWithTimeout(localC chan api.Entry, c client.Client, pendingEntry *api.Entry) (int, error) {
	// Create a timeout context. Base it on the background one since ours has been canceled.
	// We are using a timeout context rather than `time.After` here because that is the maximum
	// we want to wait, rather than a per-event timeout.
	timeout, cancel := context.WithTimeout(context.Background(), clientTimeout)
	defer cancel()

	if pendingEntry != nil {
		select {
		case c.Chan() <- *pendingEntry: //nolint:channelcheck // We want to block on the Loki client. The timeout will interrupt us.
		case <-timeout.Done():
			// If we timeout, we didn't write the pending one, so count that as remaining.
			return (1 + len(localC)), errors.New("timeout writing pending entry")
		}
	}

	for len(localC) > 0 {
		select {
		case entry := <-localC:
			c.Chan() <- entry //nolint:channelcheck // We want to block on the Loki client. The timeout will interrupt us.
		case <-timeout.Done():
			// If we timeout, we didn't write the current one, so count that as remaining.
			return (1 + len(localC)), errors.New("timeout flushing buffered entry")
		}
	}

	return 0, nil
}

// cleanUpWorker is called when the worker is shutting down. It closes the Loki client connection and signals that the worker has exited.
func cleanUpWorker(logger *zap.Logger, workerExitedC chan struct{}, c client.Client) {
	// Stop the client without blocking indefinitely.
	if err := stopClientWithTimeout(c); err != nil {
		logger.Error("worker failed to stop Loki client", zap.Error(err))
	}

	// Signal that we are done.
	select {
	case workerExitedC <- struct{}{}:
		logger.Info("Loki log writer exiting")
	default:
		logger.Error("Loki log writer failed to write the exited flag, exiting anyway")
	}
}

// stopClientWithTimeout calls the Loki client shutdown function using a timeout so that we only delay guardian shutdown for so long.
func stopClientWithTimeout(c client.Client) error {
	// Call the stop function in a go routine so we can use a timeout.
	stopExitedC := make(chan struct{}, 1)
	go func(c client.Client) {
		c.StopNow()
		stopExitedC <- struct{}{} //nolint:channelcheck // We only do a single write.
	}(c)

	// Wait for the go routine to exit or the timer to expire. Using `time.After` since this is a one shot and we don't have the context.
	select {
	case <-stopExitedC:
		return nil
	case <-time.After(clientTimeout):
		return errors.New("timeout")
	}
}

// stopWorkerWithTimeout stops the log writer and waits for it to exit. It only waits a finite length of time.
func (logger *ExternalLoggerLoki) stopWorkerWithTimeout() {
	// Shut down the worker.
	logger.cancelWorker()

	// Wait for the worker to signal that it has exited. Use a timeout so we don't wait forever.
	// It could take up to twice the client timeout for the worker to exit. Wait a little longer than that.
	// Using `time.After` since this is a one shot and we don't have the context.
	select {
	case <-logger.workerExitedC:
	case <-time.After(3 * clientTimeout):
		logger.localLogger.Error("log writer failed to exit, giving up")
	}
}
